/* Copyright (c) 2003-2008 MySQL AB
   Use is subject to license terms

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */




/*****************************************************************************
Name:          Ndb.cpp
******************************************************************************/

#include <ndb_global.h>


#include "NdbApiSignal.hpp"
#include "NdbImpl.hpp"
#include <NdbOperation.hpp>
#include <NdbTransaction.hpp>
#include <NdbEventOperation.hpp>
#include <NdbEventOperationImpl.hpp>
#include <NdbRecAttr.hpp>
#include <md5_hash.hpp>
#include <NdbSleep.h>
#include <NdbOut.hpp>
#include <ndb_limits.h>
#include "API.hpp"
#include <NdbEnv.h>
#include <BaseString.hpp>
#include <NdbSqlUtil.hpp>

/****************************************************************************
void connect();

Connect to any node which has no connection at the moment.
****************************************************************************/
NdbTransaction* Ndb::doConnect(Uint32 tConNode) 
{
  Uint32        tNode;
  Uint32        tAnyAlive = 0;
  int TretCode= 0;

  DBUG_ENTER("Ndb::doConnect");

  if (tConNode != 0) {
    TretCode = NDB_connect(tConNode);
    if ((TretCode == 1) || (TretCode == 2)) {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
      DBUG_RETURN(getConnectedNdbTransaction(tConNode));
    } else if (TretCode < 0) {
      DBUG_RETURN(NULL);
    } else if (TretCode != 0) {
      tAnyAlive = 1;
    }//if
  }//if
//****************************************************************************
// We will connect to any node. Make sure that we have connections to all
// nodes.
//****************************************************************************
  if (theImpl->m_optimized_node_selection)
  {
    Ndb_cluster_connection_node_iter &node_iter= 
      theImpl->m_node_iter;
    theImpl->m_ndb_cluster_connection.init_get_next_node(node_iter);
    while ((tNode= theImpl->m_ndb_cluster_connection.get_next_node(node_iter)))
    {
      TretCode= NDB_connect(tNode);
      if ((TretCode == 1) ||
	  (TretCode == 2))
      {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
	DBUG_RETURN(getConnectedNdbTransaction(tNode));
      } else if (TretCode < 0) {
        DBUG_RETURN(NULL);
      } else if (TretCode != 0) {
	tAnyAlive= 1;
      }//if
      DBUG_PRINT("info",("tried node %d, TretCode %d, error code %d, %s",
			 tNode, TretCode, getNdbError().code,
			 getNdbError().message));
    }
  }
  else // just do a regular round robin
  {
    Uint32 tNoOfDbNodes= theImpl->theNoOfDBnodes;
    Uint32 &theCurrentConnectIndex= theImpl->theCurrentConnectIndex;
    UintR Tcount = 0;
    do {
      theCurrentConnectIndex++;
      if (theCurrentConnectIndex >= tNoOfDbNodes)
	theCurrentConnectIndex = 0;

      Tcount++;
      tNode= theImpl->theDBnodes[theCurrentConnectIndex];
      TretCode= NDB_connect(tNode);
      if ((TretCode == 1) ||
	  (TretCode == 2))
      {
//****************************************************************************
// We have connections now to the desired node. Return
//****************************************************************************
	DBUG_RETURN(getConnectedNdbTransaction(tNode));
      } else if (TretCode < 0) {
        DBUG_RETURN(NULL);
      } else if (TretCode != 0) {
	tAnyAlive= 1;
      }//if
      DBUG_PRINT("info",("tried node %d TretCode %d", tNode, TretCode));
    } while (Tcount < tNoOfDbNodes);
  }
//****************************************************************************
// We were unable to find a free connection. If no node alive we will report
// error code for cluster failure otherwise connection failure.
//****************************************************************************
  if (tAnyAlive == 1) {
#ifdef VM_TRACE
    ndbout << "TretCode = " << TretCode << endl;
#endif
    theError.code = 4006;
  } else {
    theError.code = 4009;
  }//if
  DBUG_RETURN(NULL);
}

int 
Ndb::NDB_connect(Uint32 tNode) 
{
//****************************************************************************
// We will perform seize of a transaction record in DBTC in the specified node.
//***************************************************************************
  
  int	         tReturnCode;
  TransporterFacade *tp = theImpl->m_transporter_facade;

  DBUG_ENTER("Ndb::NDB_connect");

  bool nodeAvail = tp->get_node_alive(tNode);
  if(nodeAvail == false){
    DBUG_RETURN(0);
  }
  
  NdbTransaction * tConArray = theConnectionArray[tNode];
  if (tConArray != NULL) {
    DBUG_RETURN(2);
  }
  
  NdbTransaction * tNdbCon = getNdbCon();	// Get free connection object.
  if (tNdbCon == NULL) {
    DBUG_RETURN(4);
  }//if
  NdbApiSignal*	tSignal = getSignal();		// Get signal object
  if (tSignal == NULL) {
    releaseNdbCon(tNdbCon);
    DBUG_RETURN(4);
  }//if
  if (tSignal->setSignal(GSN_TCSEIZEREQ) == -1) {
    releaseNdbCon(tNdbCon);
    releaseSignal(tSignal);
    DBUG_RETURN(4);
  }//if
  tSignal->setData(tNdbCon->ptr2int(), 1);
//************************************************
// Set connection pointer as NdbTransaction object
//************************************************
  tSignal->setData(theMyRef, 2);	// Set my block reference
  tNdbCon->Status(NdbTransaction::Connecting); // Set status to connecting
  Uint32 nodeSequence;
  tReturnCode= sendRecSignal(tNode, WAIT_TC_SEIZE, tSignal,
                             0, &nodeSequence);
  releaseSignal(tSignal); 
  if ((tReturnCode == 0) && (tNdbCon->Status() == NdbTransaction::Connected)) {
    //************************************************
    // Send and receive was successful
    //************************************************
    NdbTransaction* tPrevFirst = theConnectionArray[tNode];
    tNdbCon->setConnectedNodeId(tNode, nodeSequence);
    
    tNdbCon->setMyBlockReference(theMyRef);
    theConnectionArray[tNode] = tNdbCon;
    tNdbCon->theNext = tPrevFirst;
    DBUG_RETURN(1);
  } else {
    releaseNdbCon(tNdbCon);
//****************************************************************************
// Unsuccessful connect is indicated by 3.
//****************************************************************************
    DBUG_PRINT("info",
	       ("unsuccessful connect tReturnCode %d, tNdbCon->Status() %d",
		tReturnCode, tNdbCon->Status()));
    if (theError.code == 299 || // single user mode
        theError.code == 281 )  // cluster shutdown in progress
    {
      // no need to retry with other node
      DBUG_RETURN(-1);
    }
    DBUG_RETURN(3);
  }//if
}//Ndb::NDB_connect()

NdbTransaction *
Ndb::getConnectedNdbTransaction(Uint32 nodeId){
  NdbTransaction* next = theConnectionArray[nodeId];
  theConnectionArray[nodeId] = next->theNext;
  next->theNext = NULL;

  return next;
}//Ndb::getConnectedNdbTransaction()

/*****************************************************************************
disconnect();

Remark:        Disconnect all connections to the database. 
*****************************************************************************/
void 
Ndb::doDisconnect()
{
  DBUG_ENTER("Ndb::doDisconnect");
  NdbTransaction* tNdbCon;
  CHECK_STATUS_MACRO_VOID;

  Uint32 tNoOfDbNodes = theImpl->theNoOfDBnodes;
  Uint8 *theDBnodes= theImpl->theDBnodes;
  DBUG_PRINT("info", ("theNoOfDBnodes=%d", tNoOfDbNodes));
  UintR i;
  for (i = 0; i < tNoOfDbNodes; i++) {
    Uint32 tNode = theDBnodes[i];
    tNdbCon = theConnectionArray[tNode];
    while (tNdbCon != NULL) {
      NdbTransaction* tmpNdbCon = tNdbCon;
      tNdbCon = tNdbCon->theNext;
      releaseConnectToNdb(tmpNdbCon);
    }//while
  }//for
  tNdbCon = theTransactionList;
  while (tNdbCon != NULL) {
    NdbTransaction* tmpNdbCon = tNdbCon;
    tNdbCon = tNdbCon->theNext;
    releaseConnectToNdb(tmpNdbCon);
  }//while
  DBUG_VOID_RETURN;
}//Ndb::disconnect()

/*****************************************************************************
int waitUntilReady(int timeout);

Return Value:   Returns 0 if the Ndb is ready within timeout seconds.
                Returns -1 otherwise.
Remark:         Waits until a node has status != 0
*****************************************************************************/ 
int
Ndb::waitUntilReady(int timeout)
{
  DBUG_ENTER("Ndb::waitUntilReady");
  int secondsCounter = 0;
  int milliCounter = 0;

  if (theInitState != Initialised) {
    // Ndb::init is not called
    theError.code = 4256;
    DBUG_RETURN(-1);
  }

  while (theNode == 0) {
    if (secondsCounter >= timeout)
    {
      theError.code = 4269;
      DBUG_RETURN(-1);
    }
    NdbSleep_MilliSleep(100);
    milliCounter += 100;
    if (milliCounter >= 1000) {
      secondsCounter++;
      milliCounter = 0;
    }//if
  }

  if (theImpl->m_ndb_cluster_connection.wait_until_ready
      (timeout-secondsCounter,30) < 0)
  {
    theError.code = 4009;
    DBUG_RETURN(-1);
  }

  DBUG_RETURN(0);
}

/*****************************************************************************
NdbTransaction* startTransaction();

Return Value:   Returns a pointer to a connection object.
                Return NULL otherwise.
Remark:         Start transaction. Synchronous.
*****************************************************************************/ 
int
Ndb::computeHash(Uint32 *retval,
                 const NdbDictionary::Table *table,
                 const struct Key_part_ptr * keyData, 
                 void* buf, Uint32 bufLen)
{
  Uint32 j = 0;
  Uint32 sumlen = 0; // Needed len
  const NdbTableImpl* impl = &NdbTableImpl::getImpl(*table);
  const NdbColumnImpl* const * cols = impl->m_columns.getBase();
  Uint32 len;
  char* pos;

  Uint32 colcnt = impl->m_columns.size();
  Uint32 parts = impl->m_noOfDistributionKeys;
  if (parts == 0)
  {
    parts = impl->m_noOfKeys;
  }

  for (Uint32 i = 0; i<parts; i++)
  {
    if (unlikely(keyData[i].ptr == 0))
      goto enullptr;
  }

  if (unlikely(keyData[parts].ptr != 0))
    goto emissingnullptr;

  const NdbColumnImpl* partcols[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
  for (Uint32 i = 0; i<colcnt && j < parts; i++)
  {
    if (cols[i]->m_distributionKey)
    {
      // wl3717_todo
      // char allowed now as dist key so this case should be tested
      partcols[j++] = cols[i];
    }
  }

  for (Uint32 i = 0; i<parts; i++)
  {
    Uint32 lb, len;
    if (unlikely(!NdbSqlUtil::get_var_length(partcols[i]->m_type, 
					     keyData[i].ptr, 
					     keyData[i].len,
					     lb, len)))
      goto emalformedkey;

    if (unlikely(keyData[i].len < (lb + len)))
      goto elentosmall;
    
    Uint32 maxlen = (partcols[i]->m_attrSize * partcols[i]->m_arraySize);

    if (unlikely(lb == 0 && keyData[i].len != maxlen))
      goto emalformedkey;
    
    if (partcols[i]->m_cs)
    {
      Uint32 xmul = partcols[i]->m_cs->strxfrm_multiply;
      xmul = xmul ? xmul : 1;
      len = xmul * (maxlen - lb);
    }

    len = (lb + len + 3) & ~(Uint32)3;
    sumlen += len;

  }
  
  if (buf)
  {
    UintPtr org = UintPtr(buf);
    UintPtr use = (org + 7) & ~(UintPtr)7;

    buf = (void*)use;
    bufLen -= (use - org);

    if (unlikely(sumlen > bufLen))
      goto ebuftosmall;
  }
  else
  {
    buf = malloc(sumlen);
    if (unlikely(buf == 0))
      goto enomem;
    bufLen = 0;
    assert((UintPtr(buf) & 7) == 0);
  }
  
  pos = (char*)buf;
  for (Uint32 i = 0; i<parts; i++)
  {
    Uint32 lb, len;
    NdbSqlUtil::get_var_length(partcols[i]->m_type, 
			       keyData[i].ptr, keyData[i].len, lb, len);
    CHARSET_INFO* cs;
    if ((cs = partcols[i]->m_cs))
    {
      Uint32 xmul = cs->strxfrm_multiply;
      if (xmul == 0)
	xmul = 1;
      /*
       * Varchar end-spaces are ignored in comparisons.  To get same hash
       * we blank-pad to maximum length via strnxfrm.
       */
      Uint32 maxlen = (partcols[i]->m_attrSize * partcols[i]->m_arraySize);
      Uint32 dstLen = xmul * (maxlen - lb);
      int n = NdbSqlUtil::strnxfrm_bug7284(cs, 
					   (unsigned char*)pos, 
					   dstLen, 
					   ((unsigned char*)keyData[i].ptr)+lb,
					   len);
      
      if (unlikely(n == -1))
	goto emalformedstring;
      
      while ((n & 3) != 0) 
      {
	pos[n++] = 0;
      }
      pos += n;
    }
    else
    {
      len += lb;
      memcpy(pos, keyData[i].ptr, len);
      while (len & 3)
      {
	* (pos + len++) = 0;
      }
      pos += len;
    }
  }
  len = UintPtr(pos) - UintPtr(buf);
  assert((len & 3) == 0);

  Uint32 values[4];
  md5_hash(values, (const Uint64*)buf, len >> 2);
  
  if (retval)
  {
    * retval = values[1];
  }
  
  if (bufLen == 0)
    free(buf);
  
  return 0;
  
enullptr:
  return 4316;
  
emissingnullptr:
  return 4276;

elentosmall:
  return 4277;

ebuftosmall:
  return 4278;

emalformedstring:
  if (bufLen == 0)
    free(buf);
  
  return 4279;
  
emalformedkey:
  return 4280;

enomem:
  return 4000;
}

NdbTransaction* 
Ndb::startTransaction(const NdbDictionary::Table *table,
		      const char * keyData, Uint32 keyLen)
{
  DBUG_ENTER("Ndb::startTransaction");

  if (theInitState == Initialised) {
    theError.code = 0;
    checkFailedNode();
    /**
     * If the user supplied key data
     * We will make a qualified quess to which node is the primary for the
     * the fragment and contact that node
     */
    Uint32 nodeId;
    NdbTableImpl* impl;
    if(table != 0 && keyData != 0 && (impl= &NdbTableImpl::getImpl(*table))) 
    {
      Uint32 hashValue;
      {
	Uint32 buf[4];
	if((UintPtr(keyData) & 7) == 0 && (keyLen & 3) == 0)
	{
	  md5_hash(buf, (const Uint64*)keyData, keyLen >> 2);
	}
	else
	{
	  Uint64 tmp[1000];
	  tmp[keyLen/8] = 0;
	  memcpy(tmp, keyData, keyLen);
	  md5_hash(buf, tmp, (keyLen+3) >> 2);	  
	}
	hashValue= buf[1];
      }
      const Uint16 *nodes;
      Uint32 cnt= impl->get_nodes(hashValue, &nodes);
      if(cnt)
	nodeId= nodes[0];
      else
	nodeId= 0;
    } else {
      nodeId = 0;
    }//if

    {
      NdbTransaction *trans= startTransactionLocal(0, nodeId);
      DBUG_PRINT("exit",("start trans: 0x%lx  transid: 0x%lx",
			 (long) trans,
                         (long) (trans ? trans->getTransactionId() : 0)));
      DBUG_RETURN(trans);
    }
  } else {
    DBUG_RETURN(NULL);
  }//if
}//Ndb::startTransaction()

/*****************************************************************************
NdbTransaction* hupp(NdbTransaction* pBuddyTrans);

Return Value:   Returns a pointer to a connection object.
                Connected to the same node as pBuddyTrans
                and also using the same transction id
Remark:         Start transaction. Synchronous.
*****************************************************************************/ 
NdbTransaction* 
Ndb::hupp(NdbTransaction* pBuddyTrans)
{
  DBUG_ENTER("Ndb::hupp");

  DBUG_PRINT("enter", ("trans: 0x%lx", (long) pBuddyTrans));

  Uint32 aPriority = 0;
  if (pBuddyTrans == NULL){
    DBUG_RETURN(startTransaction());
  }

  if (theInitState == Initialised) {
    theError.code = 0;
    checkFailedNode();

    Uint32 nodeId = pBuddyTrans->getConnectedNodeId();
    NdbTransaction* pCon = startTransactionLocal(aPriority, nodeId);
    if(pCon == NULL)
      DBUG_RETURN(NULL);

    if (pCon->getConnectedNodeId() != nodeId){
      // We could not get a connection to the desired node
      // release the connection and return NULL
      closeTransaction(pCon);
      theError.code = 4006;
      DBUG_RETURN(NULL);
    }
    pCon->setTransactionId(pBuddyTrans->getTransactionId());
    pCon->setBuddyConPtr((Uint32)pBuddyTrans->getTC_ConnectPtr());
    DBUG_PRINT("exit", ("hupp trans: 0x%lx transid: 0x%lx",
                        (long) pCon,
                        (long) (pCon ? pCon->getTransactionId() : 0)));
    DBUG_RETURN(pCon);
  } else {
    DBUG_RETURN(NULL);
  }//if
}//Ndb::hupp()

NdbTransaction* 
Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId)
{
#ifdef VM_TRACE
  char buf[255];
  const char* val = NdbEnv_GetEnv("NDB_TRANSACTION_NODE_ID", buf, 255);
  if(val != 0){
    nodeId = atoi(val);
  }
#endif

  DBUG_ENTER("Ndb::startTransactionLocal");
  DBUG_PRINT("enter", ("nodeid: %d", nodeId));

  if(unlikely(theRemainingStartTransactions == 0))
  {
    theError.code = 4006;
    DBUG_RETURN(0);
  }
  
  NdbTransaction* tConnection;
  Uint64 tFirstTransId = theFirstTransId;
  tConnection = doConnect(nodeId);
  if (tConnection == NULL) {
    DBUG_RETURN(NULL);
  }//if

  theRemainingStartTransactions--;
  NdbTransaction* tConNext = theTransactionList;
  if (tConnection->init())
  {
    theError.code = tConnection->theError.code;
    DBUG_RETURN(NULL);
  }
  theTransactionList = tConnection;        // into a transaction list.
  tConnection->next(tConNext);   // Add the active connection object
  tConnection->setTransactionId(tFirstTransId);
  tConnection->thePriority = aPriority;
  if ((tFirstTransId & 0xFFFFFFFF) == 0xFFFFFFFF) {
    //---------------------------------------------------
// Transaction id rolling round. We will start from
// consecutive identity 0 again.
//---------------------------------------------------
    theFirstTransId = ((tFirstTransId >> 32) << 32);      
  } else {
    theFirstTransId = tFirstTransId + 1;
  }//if
#ifdef VM_TRACE
  if (tConnection->theListState != NdbTransaction::NotInList) {
    printState("startTransactionLocal %x", tConnection);
    abort();
  }
#endif
  DBUG_RETURN(tConnection);
}//Ndb::startTransactionLocal()

/*****************************************************************************
void closeTransaction(NdbTransaction* aConnection);

Parameters:     aConnection: the connection used in the transaction.
Remark:         Close transaction by releasing the connection and all operations.
*****************************************************************************/
void
Ndb::closeTransaction(NdbTransaction* aConnection)
{
  DBUG_ENTER("Ndb::closeTransaction");
  NdbTransaction* tCon;
  NdbTransaction* tPreviousCon;

  if (aConnection == NULL) {
//-----------------------------------------------------
// closeTransaction called on NULL pointer, destructive
// application behaviour.
//-----------------------------------------------------
#ifdef VM_TRACE
    printf("NULL into closeTransaction\n");
#endif
    DBUG_VOID_RETURN;
  }//if
  CHECK_STATUS_MACRO_VOID;
  
  tCon = theTransactionList;
  theRemainingStartTransactions++;
  
  DBUG_PRINT("info",("close trans: 0x%lx  transid: 0x%lx",
                     (long) aConnection,
                     (long) aConnection->getTransactionId()));
  DBUG_PRINT("info",("magic number: 0x%x TCConPtr: 0x%x theMyRef: 0x%x 0x%x",
		     aConnection->theMagicNumber, aConnection->theTCConPtr,
		     aConnection->theMyRef, getReference()));

  if (aConnection == tCon) {		// Remove the active connection object
    theTransactionList = tCon->next();	// from the transaction list.
  } else { 
    while (aConnection != tCon) {
      if (tCon == NULL) {
//-----------------------------------------------------
// closeTransaction called on non-existing transaction
//-----------------------------------------------------

	if(aConnection->theError.code == 4008){
	  /**
	   * When a SCAN timed-out, returning the NdbTransaction leads
	   * to reuse. And TC crashes when the API tries to reuse it to
	   * something else...
	   */
#ifdef VM_TRACE
	  printf("Scan timeout:ed NdbTransaction-> "
		 "not returning it-> memory leak\n");
#endif
	  DBUG_VOID_RETURN;
	}

#ifdef VM_TRACE
	printf("Non-existing transaction into closeTransaction\n");
	abort();
#endif
	DBUG_VOID_RETURN;
      }//if
      tPreviousCon = tCon;
      tCon = tCon->next();
    }//while
    tPreviousCon->next(tCon->next());
  }//if
  
  aConnection->release();
  
  if(aConnection->theError.code == 4008){
    /**
     * Something timed-out, returning the NdbTransaction leads
     * to reuse. And TC crashes when the API tries to reuse it to
     * something else...
     */
#ifdef VM_TRACE
    printf("Con timeout:ed NdbTransaction-> not returning it-> memory leak\n");
#endif
    DBUG_VOID_RETURN;
  }
  
  if (aConnection->theReleaseOnClose == false) {
    /**
     * Put it back in idle list for that node
     */
    Uint32 nodeId = aConnection->getConnectedNodeId();
    aConnection->theNext = theConnectionArray[nodeId];
    theConnectionArray[nodeId] = aConnection;
    DBUG_VOID_RETURN;
  } else {
    aConnection->theReleaseOnClose = false;
    releaseNdbCon(aConnection);
  }//if
  DBUG_VOID_RETURN;
}//Ndb::closeTransaction()

/*****************************************************************************
int* NdbTamper(int aAction, int aNode);

Parameters: aAction     Specifies what action to be taken
            1: Lock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
            2: UnLock global checkpointing    Can only be sent to master DIH, Parameter aNode ignored.
	    3: Crash node

           aNode        Specifies which node the action will be taken
     	  -1: Master DIH 
       	0-16: Nodnumber

Return Value: -1 Error  .
                
Remark:         Sends a signal to DIH.
*****************************************************************************/ 
int 
Ndb::NdbTamper(TamperType aAction, int aNode)
{
  NdbTransaction*	tNdbConn;
  NdbApiSignal		tSignal(theMyRef);
  int			tNode;
  int                   tAction;
  int			ret_code;

#ifdef CUSTOMER_RELEASE
  return -1;
#else
  DBUG_ENTER("Ndb::NdbTamper");
  CHECK_STATUS_MACRO;
  checkFailedNode();

  theRestartGCI = 0;
  switch (aAction) {
// Translate enum to integer. This is done because the SCI layer
// expects integers. 
     case LockGlbChp:
        tAction = 1;
        break;
     case UnlockGlbChp:
        tAction = 2;
	break;
     case CrashNode:
        tAction = 3;
        break;
     case ReadRestartGCI:
	tAction = 4;
	break;
     default:
        theError.code = 4102;
        DBUG_RETURN(-1);
  }

  tNdbConn = getNdbCon();	// Get free connection object
  if (tNdbConn == NULL) {
    theError.code = 4000;
    DBUG_RETURN(-1);
  }
  tSignal.setSignal(GSN_DIHNDBTAMPER);
  tSignal.setData (tAction, 1);
  tSignal.setData(tNdbConn->ptr2int(),2);
  tSignal.setData(theMyRef,3);		// Set return block reference
  tNdbConn->Status(NdbTransaction::Connecting); // Set status to connecting
  TransporterFacade *tp = theImpl->m_transporter_facade;
  if (tAction == 3) {
    tp->lock_mutex();
    tp->sendSignal(&tSignal, aNode);
    tp->unlock_mutex();
    releaseNdbCon(tNdbConn);
  } else if ( (tAction == 2) || (tAction == 1) ) {
    tp->lock_mutex();
    tNode = tp->get_an_alive_node();
    if (tNode == 0) {
      theError.code = 4002;
      releaseNdbCon(tNdbConn);
      DBUG_RETURN(-1);
    }//if
    ret_code = tp->sendSignal(&tSignal,aNode);
    tp->unlock_mutex();
    releaseNdbCon(tNdbConn);
    DBUG_RETURN(ret_code);
  } else {
    do {
      tp->lock_mutex();
      // Start protected area
      tNode = tp->get_an_alive_node();
      tp->unlock_mutex();
      // End protected area
      if (tNode == 0) {
        theError.code = 4009;
        releaseNdbCon(tNdbConn);
        DBUG_RETURN(-1);
      }//if
      ret_code = sendRecSignal(tNode, WAIT_NDB_TAMPER, &tSignal, 0);
      if (ret_code == 0) {  
        if (tNdbConn->Status() != NdbTransaction::Connected) {
          theRestartGCI = 0;
        }//if
        releaseNdbCon(tNdbConn);
        DBUG_RETURN(theRestartGCI);
      } else if ((ret_code == -5) || (ret_code == -2)) {
        TRACE_DEBUG("Continue DIHNDBTAMPER when node failed/stopping");
      } else {
        DBUG_RETURN(-1);
      }//if
    } while (1);
  }
  DBUG_RETURN(0);
#endif
}
#if 0
/****************************************************************************
NdbSchemaCon* startSchemaTransaction();

Return Value:   Returns a pointer to a schema connection object.
                Return NULL otherwise.
Remark:         Start schema transaction. Synchronous.
****************************************************************************/ 
NdbSchemaCon* 
Ndb::startSchemaTransaction()
{
  NdbSchemaCon* tSchemaCon;
  if (theSchemaConToNdbList != NULL) {
    theError.code = 4321;
    return NULL;
  }//if
  tSchemaCon = new NdbSchemaCon(this);
  if (tSchemaCon == NULL) {
    theError.code = 4000;
    return NULL;
  }//if 
  theSchemaConToNdbList = tSchemaCon;
  return tSchemaCon;  
}
/*****************************************************************************
void closeSchemaTransaction(NdbSchemaCon* aSchemaCon);

Parameters:     aSchemaCon: the schemacon used in the transaction.
Remark:         Close transaction by releasing the schemacon and all schemaop.
*****************************************************************************/
void
Ndb::closeSchemaTransaction(NdbSchemaCon* aSchemaCon)
{
  if (theSchemaConToNdbList != aSchemaCon) {
    abort();
    return;
  }//if
  aSchemaCon->release();
  delete aSchemaCon;
  theSchemaConToNdbList = NULL;
  return;
}//Ndb::closeSchemaTransaction()
#endif

/*****************************************************************************
void RestartGCI(int aRestartGCI);

Remark:		Set theRestartGCI on the NDB object
*****************************************************************************/
void
Ndb::RestartGCI(int aRestartGCI)
{
  theRestartGCI = aRestartGCI;
}

/****************************************************************************
int getBlockNumber(void);

Remark:		
****************************************************************************/
int
Ndb::getBlockNumber()
{
  return theNdbBlockNumber;
}

NdbDictionary::Dictionary *
Ndb::getDictionary() const {
  return theDictionary;
}

/****************************************************************************
int getNodeId();

Remark:		
****************************************************************************/
int
Ndb::getNodeId()
{
  return theNode;
}

/****************************************************************************
Uint64 getAutoIncrementValue( const char* aTableName,
                              Uint64 & autoValue, 
                              Uint32 cacheSize, 
                              Uint64 step,
                              Uint64 start);

Parameters:     aTableName (IN) : The table name.
                autoValue (OUT) : Returns new autoincrement value
                cacheSize  (IN) : Prefetch this many values
                step       (IN) : Specifies the step between the 
                                  autoincrement values.
                start      (IN) : Start value for first value
Returns:        0 if succesful, -1 if error encountered
Remark:		Returns a new autoincrement value to the application.
                The autoincrement values can be increased by steps
                (default 1) and a number of values can be prefetched
                by specifying cacheSize (default 10).
****************************************************************************/
int
Ndb::getAutoIncrementValue(const char* aTableName,
                           Uint64 & autoValue, Uint32 cacheSize,
                           Uint64 step, Uint64 start)
{
  DBUG_ENTER("Ndb::getAutoIncrementValue");
  ASSERT_NOT_MYSQLD;
  BaseString internal_tabname(internalize_table_name(aTableName));

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
    DBUG_RETURN(-1);
  }
  const NdbTableImpl* table = info->m_table_impl;
  TupleIdRange & range = info->m_tuple_id_range;
  if (getTupleIdFromNdb(table, range, autoValue, cacheSize, step, start) == -1)
    DBUG_RETURN(-1);
  DBUG_PRINT("info", ("value %lu", (ulong) autoValue));
  DBUG_RETURN(0);
}

int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
                           Uint64 & autoValue, Uint32 cacheSize,
                           Uint64 step, Uint64 start)
{
  DBUG_ENTER("Ndb::getAutoIncrementValue");
  ASSERT_NOT_MYSQLD;
  assert(aTable != 0);
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  const BaseString& internal_tabname = table->m_internalName;

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
    DBUG_RETURN(-1);
  }
  TupleIdRange & range = info->m_tuple_id_range;
  if (getTupleIdFromNdb(table, range, autoValue, cacheSize, step, start) == -1)
    DBUG_RETURN(-1);
  DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
  DBUG_RETURN(0);
}

int
Ndb::getAutoIncrementValue(const NdbDictionary::Table * aTable,
                           TupleIdRange & range, Uint64 & autoValue,
                           Uint32 cacheSize, Uint64 step, Uint64 start)
{
  DBUG_ENTER("Ndb::getAutoIncrementValue");
  assert(aTable != 0);
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);

  if (getTupleIdFromNdb(table, range, autoValue, cacheSize, step, start) == -1)
    DBUG_RETURN(-1);
  DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
  DBUG_RETURN(0);
}

int
Ndb::getTupleIdFromNdb(const NdbTableImpl* table,
                       TupleIdRange & range, Uint64 & tupleId, 
                       Uint32 cacheSize, Uint64 step, Uint64 start)
{
/*
  Returns a new TupleId to the application.
  The TupleId comes from SYSTAB_0 where SYSKEY_0 = TableId.
  It is initialized to (TableId << 48) + 1 in NdbcntrMain.cpp.
  In most cases step= start= 1, in which case we get:
  1,2,3,4,5,...
  If step=10 and start=5 and first number is 1, we get:
  5,15,25,35,...  
*/
  DBUG_ENTER("Ndb::getTupleIdFromNdb");
  /*
   Check if the next value can be taken from the pre-fetched
   sequence.
  */
  if (range.m_first_tuple_id != range.m_last_tuple_id &&
      range.m_first_tuple_id + step <= range.m_last_tuple_id)
  {
    assert(range.m_first_tuple_id < range.m_last_tuple_id);
    range.m_first_tuple_id += step; 
    tupleId = range.m_first_tuple_id;
    DBUG_PRINT("info", ("Next cached value %lu", (ulong) tupleId));
  }
  else
  {
    /*
      If start value is greater than step it is ignored
     */
    Uint64 offset = (start > step) ? 1 : start;

    /*
      Pre-fetch a number of values depending on cacheSize
     */
    if (cacheSize == 0)
      cacheSize = 1;

    DBUG_PRINT("info", ("reading %u values from database", (uint)cacheSize));
    /*
     * reserve next cacheSize entries in db.  adds cacheSize to NEXTID
     * and returns first tupleId in the new range. If tupleId's are
     * incremented in steps then multiply the cacheSize with step size.
     */
    Uint64 opValue = cacheSize * step;

    if (opTupleIdOnNdb(table, range, opValue, 0) == -1)
      DBUG_RETURN(-1);
    DBUG_PRINT("info", ("Next value fetched from database %lu", (ulong) opValue));
    DBUG_PRINT("info", ("Increasing %lu by offset %lu, increment  is %lu", (ulong) (ulong) opValue, (ulong) offset, (ulong) step));
    Uint64 current, next;
    Uint64 div = ((Uint64) (opValue + step - offset)) / step;
    next = div * step + offset;
    current = (next < step) ? next : next - step;
    tupleId = (opValue <= current) ? current : next;
    DBUG_PRINT("info", ("Returning %lu", (ulong) tupleId));
    range.m_first_tuple_id = tupleId;
  }
  DBUG_RETURN(0);
}

/****************************************************************************
int readAutoIncrementValue( const char* aTableName,
                            Uint64 & autoValue);

Parameters:     aTableName (IN) : The table name.
                autoValue  (OUT) : The current autoincrement value
Returns:        0 if succesful, -1 if error encountered
Remark:         Returns the current autoincrement value to the application.
****************************************************************************/
int
Ndb::readAutoIncrementValue(const char* aTableName,
                            Uint64 & autoValue)
{
  DBUG_ENTER("Ndb::readAutoIncrementValue");
  ASSERT_NOT_MYSQLD;
  BaseString internal_tabname(internalize_table_name(aTableName));

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
    DBUG_RETURN(-1);
  }
  const NdbTableImpl* table = info->m_table_impl;
  TupleIdRange & range = info->m_tuple_id_range;
  if (readTupleIdFromNdb(table, range, autoValue) == -1)
    DBUG_RETURN(-1);
  DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
  DBUG_RETURN(0);
}

int
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
                            Uint64 & autoValue)
{
  DBUG_ENTER("Ndb::readAutoIncrementValue");
  ASSERT_NOT_MYSQLD;
  assert(aTable != 0);
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  const BaseString& internal_tabname = table->m_internalName;

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
    DBUG_RETURN(-1);
  }
  TupleIdRange & range = info->m_tuple_id_range;
  if (readTupleIdFromNdb(table, range, autoValue) == -1)
    DBUG_RETURN(-1);
  DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
  DBUG_RETURN(0);
}

int
Ndb::readAutoIncrementValue(const NdbDictionary::Table * aTable,
                            TupleIdRange & range, Uint64 & autoValue)
{
  DBUG_ENTER("Ndb::readAutoIncrementValue");
  assert(aTable != 0);
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);

  if (readTupleIdFromNdb(table, range, autoValue) == -1)
    DBUG_RETURN(-1);
  DBUG_PRINT("info", ("value %lu", (ulong)autoValue));
  DBUG_RETURN(0);
}

int
Ndb::readTupleIdFromNdb(const NdbTableImpl* table,
                        TupleIdRange & range, Uint64 & tupleId)
{
  DBUG_ENTER("Ndb::readTupleIdFromNdb");
  if (range.m_first_tuple_id != range.m_last_tuple_id)
  {
    assert(range.m_first_tuple_id < range.m_last_tuple_id);
    tupleId = range.m_first_tuple_id + 1;
  }
  else
  {
    /*
     * peek at NEXTID.  does not reserve it so the value is valid
     * only if no other transactions are allowed.
     */
    Uint64 opValue = 0;
    if (opTupleIdOnNdb(table, range, opValue, 3) == -1)
      DBUG_RETURN(-1);
    tupleId = opValue;
  }
  DBUG_RETURN(0);
}

/****************************************************************************
int setAutoIncrementValue( const char* aTableName,
                           Uint64 autoValue,
                           bool modify);

Parameters:     aTableName (IN) : The table name.
                autoValue  (IN) : The new autoincrement value
                modify     (IN) : Modify existing value (not initialization)
Returns:        0 if succesful, -1 if error encountered
Remark:         Sets a new autoincrement value for the application.
****************************************************************************/
int
Ndb::setAutoIncrementValue(const char* aTableName,
                           Uint64 autoValue, bool modify)
{
  DBUG_ENTER("Ndb::setAutoIncrementValue");
  ASSERT_NOT_MYSQLD;
  BaseString internal_tabname(internalize_table_name(aTableName));

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
    DBUG_RETURN(-1);
  }
  const NdbTableImpl* table = info->m_table_impl;
  TupleIdRange & range = info->m_tuple_id_range;
  if (setTupleIdInNdb(table, range, autoValue, modify) == -1)
    DBUG_RETURN(-1);
  DBUG_RETURN(0);
}

int
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
                           Uint64 autoValue, bool modify)
{
  DBUG_ENTER("Ndb::setAutoIncrementValue");
  ASSERT_NOT_MYSQLD;
  assert(aTable != 0);
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);
  const BaseString& internal_tabname = table->m_internalName;

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname);
  if (info == 0) {
    theError.code = theDictionary->getNdbError().code;
    DBUG_RETURN(-1);
  }
  TupleIdRange & range = info->m_tuple_id_range;
  if (setTupleIdInNdb(table, range, autoValue, modify) == -1)
    DBUG_RETURN(-1);
  DBUG_RETURN(0);
}

int
Ndb::setAutoIncrementValue(const NdbDictionary::Table * aTable,
                           TupleIdRange & range, Uint64 autoValue,
                           bool modify)
{
  DBUG_ENTER("Ndb::setAutoIncrementValue");
  assert(aTable != 0);
  const NdbTableImpl* table = & NdbTableImpl::getImpl(*aTable);

  if (setTupleIdInNdb(table, range, autoValue, modify) == -1)
    DBUG_RETURN(-1);
  DBUG_RETURN(0);
}

int
Ndb::setTupleIdInNdb(const NdbTableImpl* table,
                     TupleIdRange & range, Uint64 tupleId, bool modify)
{
  DBUG_ENTER("Ndb::setTupleIdInNdb");
  if (modify)
  {
    if (checkTupleIdInNdb(range, tupleId))
    {
      if (range.m_first_tuple_id != range.m_last_tuple_id)
      {
        assert(range.m_first_tuple_id < range.m_last_tuple_id);
        if (tupleId <= range.m_first_tuple_id + 1)
          DBUG_RETURN(0);
        if (tupleId <= range.m_last_tuple_id)
        {
          range.m_first_tuple_id = tupleId - 1;
          DBUG_PRINT("info", 
                     ("Setting next auto increment cached value to %lu",
                      (ulong)tupleId));  
          DBUG_RETURN(0);
        }
      }
      /*
       * if tupleId <= NEXTID, do nothing.  otherwise update NEXTID to
       * tupleId and set cached range to first = last = tupleId - 1.
       */
      if (opTupleIdOnNdb(table, range, tupleId, 2) == -1)
        DBUG_RETURN(-1);
    }
  }
  else
  {
    /*
     * update NEXTID to given value.  reset cached range.
     */
    if (opTupleIdOnNdb(table, range, tupleId, 1) == -1)
      DBUG_RETURN(-1);
  }
  DBUG_RETURN(0);
}

int Ndb::initAutoIncrement()
{
  if (m_sys_tab_0)
    return 0;

  BaseString currentDb(getDatabaseName());
  BaseString currentSchema(getDatabaseSchemaName());

  setDatabaseName("sys");
  setDatabaseSchemaName("def");

  m_sys_tab_0 = theDictionary->getTableGlobal("SYSTAB_0");

  // Restore current name space
  setDatabaseName(currentDb.c_str());
  setDatabaseSchemaName(currentSchema.c_str());

  if (m_sys_tab_0 == NULL) {
    assert(theDictionary->m_error.code != 0);
    theError.code = theDictionary->m_error.code;
    return -1;
  }

  return 0;
}

bool
Ndb::checkUpdateAutoIncrementValue(TupleIdRange & range, Uint64 autoValue)
{
  return(checkTupleIdInNdb(range, autoValue) != 0);
}

int
Ndb::checkTupleIdInNdb(TupleIdRange & range, Uint64 tupleId)
{
  DBUG_ENTER("Ndb::checkTupleIdIndNdb");
  if ((range.m_first_tuple_id != ~(Uint64)0) &&
      (range.m_first_tuple_id > tupleId))
  {
   /*
    * If we have ever cached a value in this object and this cached
    * value is larger than the value we're trying to set then we
    * need not check with the real value in the SYSTAB_0 table.
    */
    DBUG_RETURN(0);
  }
  if (range.m_highest_seen > tupleId)
  {
    /*
     * Although we've never cached any higher value we have read
     * a higher value and again it isn't necessary to change the
     * auto increment value.
     */
    DBUG_RETURN(0);
  }
  DBUG_RETURN(1);
}


int
Ndb::opTupleIdOnNdb(const NdbTableImpl* table,
                    TupleIdRange & range, Uint64 & opValue, Uint32 op)
{
  DBUG_ENTER("Ndb::opTupleIdOnNdb");
  Uint32 aTableId = table->m_id;
  DBUG_PRINT("enter", ("table: %u  value: %lu  op: %u",
                       aTableId, (ulong) opValue, op));

  NdbTransaction*    tConnection = NULL;
  NdbOperation*      tOperation = NULL;
  Uint64             tValue;
  NdbRecAttr*        tRecAttrResult;

  CHECK_STATUS_MACRO;

  if (initAutoIncrement() == -1)
    goto error_handler;

  tConnection = this->startTransaction();
  if (tConnection == NULL)
    goto error_handler;

  tOperation = tConnection->getNdbOperation(m_sys_tab_0);
  if (tOperation == NULL)
    goto error_handler;

  switch (op)
    {
    case 0:
      tOperation->interpretedUpdateTuple();
      tOperation->equal("SYSKEY_0", aTableId);
      tOperation->incValue("NEXTID", opValue);
      tRecAttrResult = tOperation->getValue("NEXTID");

      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
        goto error_handler;

      tValue = tRecAttrResult->u_64_value();

      range.m_first_tuple_id = tValue - opValue;
      range.m_last_tuple_id  = tValue - 1;
      opValue = range.m_first_tuple_id; // out
      break;
    case 1:
      // create on first use
      tOperation->writeTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->setValue("NEXTID", opValue);

      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
        goto error_handler;

      range.reset();
      break;
    case 2:
      tOperation->interpretedUpdateTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tOperation->load_const_u64(1, opValue);
      tOperation->read_attr("NEXTID", 2);
      // compare NEXTID >= opValue
      tOperation->branch_le(2, 1, 0);
      tOperation->write_attr("NEXTID", 1);
      tOperation->interpret_exit_ok();
      tOperation->def_label(0);
      tOperation->interpret_exit_ok();
      tRecAttrResult = tOperation->getValue("NEXTID");
      if (tConnection->execute( NdbTransaction::Commit ) == -1)
      {
        goto error_handler;
      }
      else
      {
        range.m_highest_seen = tRecAttrResult->u_64_value();
        DBUG_PRINT("info", 
                   ("Setting next auto increment value (db) to %lu",
                    (ulong) opValue));  
        range.m_first_tuple_id = range.m_last_tuple_id = opValue - 1;
      }
      break;
    case 3:
      tOperation->readTuple();
      tOperation->equal("SYSKEY_0", aTableId );
      tRecAttrResult = tOperation->getValue("NEXTID");
      if (tConnection->execute( NdbTransaction::Commit ) == -1 )
        goto error_handler;
      range.m_highest_seen = opValue = tRecAttrResult->u_64_value(); // out
      break;
    default:
      goto error_handler;
    }

  this->closeTransaction(tConnection);

  DBUG_RETURN(0);

error_handler:
  DBUG_PRINT("error", ("ndb=%d con=%d op=%d",
             theError.code,
             tConnection != NULL ? tConnection->theError.code : -1,
             tOperation != NULL ? tOperation->theError.code : -1));

  if (theError.code == 0 && tConnection != NULL)
    theError.code = tConnection->theError.code;
  if (theError.code == 0 && tOperation != NULL)
    theError.code = tOperation->theError.code;
  DBUG_ASSERT(theError.code != 0);

  NdbError savedError;
  savedError = theError;

  if (tConnection != NULL)
    this->closeTransaction(tConnection);

  theError = savedError;

  DBUG_RETURN(-1);
}

Uint32
convertEndian(Uint32 Data)
{
#ifdef WORDS_BIGENDIAN
  Uint32 t1, t2, t3, t4;
  t4 = (Data >> 24) & 255;
  t3 = (Data >> 16) & 255;
  t4 = t4 + (t3 << 8);
  t2 = (Data >> 8) & 255;
  t4 = t4 + (t2 << 16);
  t1 = Data & 255;
  t4 = t4 + (t1 << 24);
  return t4;
#else
  return Data;
#endif
}

// <internal>
Ndb_cluster_connection &
Ndb::get_ndb_cluster_connection()
{
  return theImpl->m_ndb_cluster_connection;
}

const char * Ndb::getCatalogName() const
{
  return theImpl->m_dbname.c_str();
}

int Ndb::setCatalogName(const char * a_catalog_name)
{
  // TODO can table_name_separator be escaped?
  if (a_catalog_name && ! strchr(a_catalog_name, table_name_separator)) {
    if (!theImpl->m_dbname.assign(a_catalog_name) ||
        theImpl->update_prefix())
    {
      theError.code = 4000;
      return -1;
    }
  }
  return 0;
}

const char * Ndb::getSchemaName() const
{
  return theImpl->m_schemaname.c_str();
}

int Ndb::setSchemaName(const char * a_schema_name)
{
  // TODO can table_name_separator be escaped?
  if (a_schema_name && ! strchr(a_schema_name, table_name_separator)) {
    if (!theImpl->m_schemaname.assign(a_schema_name) ||
        theImpl->update_prefix())
    {
      theError.code = 4000;
      return -1;
    }
  }
  return 0;
}
// </internal>
 
const char * Ndb::getDatabaseName() const
{
  return getCatalogName();
}
 
int Ndb::setDatabaseName(const char * a_catalog_name)
{
  return setCatalogName(a_catalog_name);
}
 
const char * Ndb::getDatabaseSchemaName() const
{
  return getSchemaName();
}
 
int Ndb::setDatabaseSchemaName(const char * a_schema_name)
{
  return setSchemaName(a_schema_name);
}

int Ndb::setDatabaseAndSchemaName(const NdbDictionary::Table* t)
{
  const char* s0 = t->m_impl.m_internalName.c_str();
  const char* s1 = strchr(s0, table_name_separator);
  if (s1 && s1 != s0) {
    const char* s2 = strchr(s1 + 1, table_name_separator);
    if (s2 && s2 != s1 + 1) {
      char buf[NAME_LEN + 1];
      if (s1 - s0 <= NAME_LEN && s2 - (s1 + 1) <= NAME_LEN) {
        sprintf(buf, "%.*s", (int) (s1 - s0), s0);
        setDatabaseName(buf);
        sprintf(buf, "%.*s", (int) (s2 - (s1 + 1)), s1 + 1);
        setDatabaseSchemaName(buf);
        return 0;
      }
    }
  }
  return -1;
}
 
bool Ndb::usingFullyQualifiedNames()
{
  return fullyQualifiedNames;
}
 
const char *
Ndb::externalizeTableName(const char * internalTableName, bool fullyQualifiedNames)
{
  if (fullyQualifiedNames) {
    register const char *ptr = internalTableName;
   
    // Skip database name
    while (*ptr && *ptr++ != table_name_separator);
    // Skip schema name
    while (*ptr && *ptr++ != table_name_separator);
    return ptr;
  }
  else
    return internalTableName;
}

const char *
Ndb::externalizeTableName(const char * internalTableName)
{
  return externalizeTableName(internalTableName, usingFullyQualifiedNames());
}

const char *
Ndb::externalizeIndexName(const char * internalIndexName, bool fullyQualifiedNames)
{
  if (fullyQualifiedNames) {
    register const char *ptr = internalIndexName;
   
    // Scan name from the end
    while (*ptr++); ptr--; // strend
    while (ptr >= internalIndexName && *ptr != table_name_separator)
      ptr--;
     
    return ptr + 1;
  }
  else
    return internalIndexName;
}

const char *
Ndb::externalizeIndexName(const char * internalIndexName)
{
  return externalizeIndexName(internalIndexName, usingFullyQualifiedNames());
}


const BaseString
Ndb::internalize_table_name(const char *external_name) const
{
  BaseString ret;
  DBUG_ENTER("internalize_table_name");
  DBUG_PRINT("enter", ("external_name: %s", external_name));

  if (fullyQualifiedNames)
  {
    /* Internal table name format <db>/<schema>/<table>
       <db>/<schema>/ is already available in m_prefix
       so just concat the two strings
     */
#ifdef VM_TRACE
    // verify that m_prefix looks like abc/def/
    const char* s0 = theImpl->m_prefix.c_str();
    const char* s1 = s0 ? strchr(s0, table_name_separator) : 0;
    const char* s2 = s1 ? strchr(s1 + 1, table_name_separator) : 0;
    assert(s1 && s1 != s0 && s2 && s2 != s1 + 1 && *(s2 + 1) == 0);
#endif
    ret.assfmt("%s%s",
               theImpl->m_prefix.c_str(),
               external_name);
  }
  else
    ret.assign(external_name);

  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
  DBUG_RETURN(ret);
}

const BaseString
Ndb::old_internalize_index_name(const NdbTableImpl * table,
				const char * external_name) const
{
  BaseString ret;
  DBUG_ENTER("old_internalize_index_name");
  DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
                       external_name, table ? table->m_id : ~0));
  if (!table)
  {
    DBUG_PRINT("error", ("!table"));
    DBUG_RETURN(ret);
  }

  if (fullyQualifiedNames)
  {
    /* Internal index name format <db>/<schema>/<tabid>/<table> */
    ret.assfmt("%s%d%c%s",
               theImpl->m_prefix.c_str(),
               table->m_id,
               table_name_separator,
               external_name);
  }
  else
    ret.assign(external_name);

  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
  DBUG_RETURN(ret);
}

const BaseString
Ndb::internalize_index_name(const NdbTableImpl * table,
                           const char * external_name) const
{
  BaseString ret;
  DBUG_ENTER("internalize_index_name");
  DBUG_PRINT("enter", ("external_name: %s, table_id: %d",
                       external_name, table ? table->m_id : ~0));
  if (!table)
  {
    DBUG_PRINT("error", ("!table"));
    DBUG_RETURN(ret);
  }

  if (fullyQualifiedNames)
  {
    /* Internal index name format sys/def/<tabid>/<table> */
    ret.assfmt("%s%d%c%s",
               theImpl->m_systemPrefix.c_str(),
               table->m_id,
               table_name_separator,
               external_name);
  }
  else
    ret.assign(external_name);

  DBUG_PRINT("exit", ("internal_name: %s", ret.c_str()));
  DBUG_RETURN(ret);
}


const BaseString
Ndb::getDatabaseFromInternalName(const char * internalName)
{
  char * databaseName = new char[strlen(internalName) + 1];
  if (databaseName == NULL)
  {
    errno = ENOMEM;
    return BaseString(NULL);
  }
  strcpy(databaseName, internalName);
  register char *ptr = databaseName;
   
  /* Scan name for the first table_name_separator */
  while (*ptr && *ptr != table_name_separator)
    ptr++;
  *ptr = '\0';
  BaseString ret = BaseString(databaseName);
  delete [] databaseName;
  return ret;
}
 
const BaseString
Ndb::getSchemaFromInternalName(const char * internalName)
{
  char * schemaName = new char[strlen(internalName)];
  if (schemaName == NULL)
  {
    errno = ENOMEM;
    return BaseString(NULL);
  }
  register const char *ptr1 = internalName;
   
  /* Scan name for the second table_name_separator */
  while (*ptr1 && *ptr1 != table_name_separator)
    ptr1++;
  strcpy(schemaName, ptr1 + 1);
  register char *ptr = schemaName;
  while (*ptr && *ptr != table_name_separator)
    ptr++;
  *ptr = '\0';
  BaseString ret = BaseString(schemaName);
  delete [] schemaName;
  return ret;
}

// ToDo set event buffer size
NdbEventOperation* Ndb::createEventOperation(const char* eventName)
{
  DBUG_ENTER("Ndb::createEventOperation");
  NdbEventOperation* tOp= theEventBuffer->createEventOperation(eventName,
							       theError);
  if (tOp)
  {
    // keep track of all event operations
    NdbEventOperationImpl *op=
      NdbEventBuffer::getEventOperationImpl(tOp);
    op->m_next= theImpl->m_ev_op;
    op->m_prev= 0;
    theImpl->m_ev_op= op;
    if (op->m_next)
      op->m_next->m_prev= op;
  }

  DBUG_RETURN(tOp);
}

int Ndb::dropEventOperation(NdbEventOperation* tOp)
{
  DBUG_ENTER("Ndb::dropEventOperation");
  DBUG_PRINT("info", ("name: %s", tOp->getEvent()->getTable()->getName()));
  // remove it from list
  NdbEventOperationImpl *op=
    NdbEventBuffer::getEventOperationImpl(tOp);
  if (op->m_next)
    op->m_next->m_prev= op->m_prev;
  if (op->m_prev)
    op->m_prev->m_next= op->m_next;
  else
    theImpl->m_ev_op= op->m_next;

  DBUG_PRINT("info", ("first: %s",
                      theImpl->m_ev_op ? theImpl->m_ev_op->getEvent()->getTable()->getName() : "<empty>"));
  assert(theImpl->m_ev_op == 0 || theImpl->m_ev_op->m_prev == 0);

  theEventBuffer->dropEventOperation(tOp);
  DBUG_RETURN(0);
}

NdbEventOperation *Ndb::getEventOperation(NdbEventOperation* tOp)
{
  NdbEventOperationImpl *op;
  if (tOp)
    op= NdbEventBuffer::getEventOperationImpl(tOp)->m_next;
  else
    op= theImpl->m_ev_op;
  if (op)
    return op->m_facade;
  return 0;
}

int
Ndb::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
{
  return theEventBuffer->pollEvents(aMillisecondNumber, latestGCI);
}

int
Ndb::flushIncompleteEvents(Uint64 gci)
{
  return theEventBuffer->flushIncompleteEvents(gci);
}

NdbEventOperation *Ndb::nextEvent()
{
  return theEventBuffer->nextEvent();
}

const NdbEventOperation*
Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types)
{
  NdbEventOperationImpl* op =
    theEventBuffer->getGCIEventOperations(iter, event_types);
  if (op != NULL)
    return op->m_facade;
  return NULL;
}

Uint64 Ndb::getLatestGCI()
{
  return theEventBuffer->getLatestGCI();
}

void Ndb::setReportThreshEventGCISlip(unsigned thresh)
{
 if (theEventBuffer->m_free_thresh != thresh)
 {
   theEventBuffer->m_free_thresh= thresh;
   theEventBuffer->m_min_free_thresh= thresh;
   theEventBuffer->m_max_free_thresh= 100;
 }
}

void Ndb::setReportThreshEventFreeMem(unsigned thresh)
{
  if (theEventBuffer->m_free_thresh != thresh)
  {
    theEventBuffer->m_free_thresh= thresh;
    theEventBuffer->m_min_free_thresh= thresh;
    theEventBuffer->m_max_free_thresh= 100;
  }
}

#ifdef VM_TRACE
#include <NdbMutex.h>
extern NdbMutex *ndb_print_state_mutex;

static bool
checkdups(NdbTransaction** list, unsigned no)
{
  for (unsigned i = 0; i < no; i++)
    for (unsigned j = i + 1; j < no; j++)
      if (list[i] == list[j])
        return true;
  return false;
}
void
Ndb::printState(const char* fmt, ...)
{
  char buf[200];
  va_list ap;
  va_start(ap, fmt);
  vsprintf(buf, fmt, ap);
  va_end(ap);
  NdbMutex_Lock(ndb_print_state_mutex);
  bool dups = false;
  unsigned i;
  ndbout << buf << " ndb=" << hex << (void*)this << endl;
  for (unsigned n = 0; n < MAX_NDB_NODES; n++) {
    NdbTransaction* con = theConnectionArray[n];
    if (con != 0) {
      ndbout << "conn " << n << ":" << endl;
      while (con != 0) {
        con->printState();
        con = con->theNext;
      }
    }
  }
  ndbout << "prepared: " << theNoOfPreparedTransactions<< endl;
  if (checkdups(thePreparedTransactionsArray, theNoOfPreparedTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
  for (i = 0; i < theNoOfPreparedTransactions; i++)
    thePreparedTransactionsArray[i]->printState();
  ndbout << "sent: " << theNoOfSentTransactions<< endl;
  if (checkdups(theSentTransactionsArray, theNoOfSentTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
  for (i = 0; i < theNoOfSentTransactions; i++)
    theSentTransactionsArray[i]->printState();
  ndbout << "completed: " << theNoOfCompletedTransactions<< endl;
  if (checkdups(theCompletedTransactionsArray, theNoOfCompletedTransactions)) {
    ndbout << "!! DUPS !!" << endl;
    dups = true;
  }
  for (i = 0; i < theNoOfCompletedTransactions; i++)
    theCompletedTransactionsArray[i]->printState();
  NdbMutex_Unlock(ndb_print_state_mutex);
}
#endif


